Random ForestsΒΆ
Problem Statement
The input data contains surveyed information about potential
customers for a bank. The goal is to build a model that would
predict if the prospect would become a customer of a bank,
if contacted by a marketing exercise.
Techniques used:
1. Random Forests
2. Training and Testing
3. Confusion Matrix
4. Indicator Variables
5. Variable Reduction
# -*- coding: utf-8 -*-
import os
os.chdir("/home/cloudops/spark")
os.curdir
# =====================================
# Load the CSV file into a RDD
# The file is semicolon separated
bankData = sc.textFile("data/bank.csv")
bankData.cache()
bankData.count() # 542
# Remove the first line (contains headers)
firstLine = bankData.first()
# print(firstLine)
# 0 1 2 3 4 5
# "age";"job";"marital";"education";"default";"balance";
# 6 7 8 9 10 11
# "housing";"loan";"contact";"day";"month";"duration";
# 12 13 14 15 16
# "campaign";"pdays";"previous";"poutcome";"y"
dataLines = bankData.filter(lambda x: x != firstLine)
dataLines.count() # 541
# =====================================
# Convert the RDD into a Dense Vector
# 1. Change labels to numeric ones
# Notes: Vectors from MLLib
# from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import Vectors
def transformToNumeric(inputStr):
# remove "", split by ';'
attList = inputStr.replace("\"","").split(";")
age = float(attList[0])
# outcome - convert to float
outcome = 0.0 if attList[16] == "no" else 1.0
# matrial
# create indicator variables for single/married - 3 columns
single = 1.0 if attList[2] == "single" else 0.0
married = 1.0 if attList[2] == "married" else 0.0
divorced = 1.0 if attList[2] == "divorced" else 0.0
# education
# create indicator variables for education - 3 columns
primary = 1.0 if attList[3] == "primary" else 0.0
secondary = 1.0 if attList[3] == "secondary" else 0.0
tertiary = 1.0 if attList[3] == "tertiary" else 0.0
# default - convert to float
default = 0.0 if attList[4] == "no" else 1.0
# balance - convert to float
balance = float(attList[5])
# loan - convert to float
loan = 0.0 if attList[7] == "no" else 1.0
# Filter out columns not wanted at this stage
values = Vectors.dense([ outcome, age, single, married, \
divorced, primary, secondary, tertiary,\
default, balance, loan \
])
return values
# Change to a Vector
bankVectors = dataLines.map(transformToNumeric)
bankVectors.collect()[:15]
# [DenseVector([0.0, 30.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1787.0, 0.0]),
# DenseVector([1.0, 33.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 4789.0, 1.0]),
# DenseVector([1.0, 35.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1350.0, 0.0]),
# DenseVector([1.0, 30.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1476.0, 1.0]),
# . . .
# =====================================
# Perform statistical Analysis
# =====================================
from pyspark.mllib.stat import Statistics
bankStats=Statistics.colStats(bankVectors)
bankStats.mean()
# array([3.97412200e-01, 4.12698706e+01, 2.75415896e-01, 6.15526802e-01,
# 1.09057301e-01, 1.53419593e-01, 4.95378928e-01, 3.14232902e-01,
# 2.21811460e-02, 1.44478189e+03, 1.62661738e-01])
bankStats.variance()
# array([2.39919217e-01, 1.11415924e+02, 1.99931540e-01, 2.37091805e-01,
# 9.73437393e-02, 1.30122544e-01, 2.50441569e-01, 2.15889642e-01,
# 2.17293079e-02, 5.87224851e+06, 1.36455124e-01])
bankStats.min()
bankStats.max()
Statistics.corr(bankVectors)
# array([[ 1. , -0.18232104, 0.46323285, -0.37532413, -0.0781266 ,
# -0.12561549, 0.02639277, 0.08494841, -0.04536965, 0.03657487,
# -0.03042059],
# . . .
# Low correlation - may be set of columns could provide more
# =====================================
# Transform DenseVector -> LabelPoint -> Data Frame
# Drop columns that are not required (low correlation)
# =====================================
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
def transformToLabeledPoint(inStr) :
lp = (float(inStr[0]), \
Vectors.dense([inStr[1],inStr[2],inStr[3], \
inStr[4],inStr[5],inStr[6],inStr[7], \
inStr[8],inStr[9],inStr[10] \
]) \
)
return lp
# Convert DenseVector to Label Point
bankLp = bankVectors.map(transformToLabeledPoint)
bankLp.collect()
# [(0.0, DenseVector([30.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1787.0, 0.0])),
# (1.0, DenseVector([33.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 4789.0, 1.0])),
# (1.0, DenseVector([35.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1350.0, 0.0])),
# (1.0, DenseVector([30.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1476.0, 1.0])),
# Convert LabelPoint to SQL DataFrame
bankDF = sqlContext.createDataFrame(bankLp, ["label", "features"])
bankDF.select("label","features").show(5)
# +-----+--------------------+
# |label| features|
# +-----+--------------------+
# | 0.0|[30.0,0.0,1.0,0.0...|
# | 1.0|[33.0,0.0,1.0,0.0...|
# | 1.0|[35.0,1.0,0.0,0.0...|
# | 1.0|[30.0,0.0,1.0,0.0...|
# | 0.0|[59.0,0.0,1.0,0.0...|
# +-----+--------------------+
# =====================================
# Perform Principal Component Analysis (PCA)
# =====================================
from pyspark.ml.feature import PCA
# top 3 principal components
bankPCA = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
pcaModel = bankPCA.fit(bankDF)
# ERROR:
# IllegalArgumentException: 'requirement failed: Column features must be of type
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'
# Recrate starting from sc
# OK
pcaResult = pcaModel.transform(bankDF).select("label","pcaFeatures")
pcaResult.show(truncate=False)
# +-----+------------------------------------------------------------+
# |label|pcaFeatures |
# +-----+------------------------------------------------------------+
# |0.0 |[-1787.018897197381,28.86209683775509,-0.06459982604832398] |
# |1.0 |[-4789.020177138492,29.922562636341418,-0.983024351309942] |
# |1.0 |[-1350.0222131632622,34.10110809796672,0.8951427168281594] |
# |1.0 |[-1476.0189517184558,29.05133399359654,0.39527238680255716] |
# . . .
# =====================================
# Indexing needed as pre-req for Decision Trees
# =====================================
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(pcaResult)
td = si_model.transform(pcaResult)
td.collect()
# [Row(label=0.0, pcaFeatures=DenseVector([-1787.0189, 28.8621, -0.0646]), indexed=0.0),
# Row(label=1.0, pcaFeatures=DenseVector([-4789.0202, 29.9226, -0.983]), indexed=1.0),
# Row(label=1.0, pcaFeatures=DenseVector([-1350.0222, 34.1011, 0.8951]), indexed=1.0),
# Row(label=1.0, pcaFeatures=DenseVector([-1476.019, 29.0513, 0.3953]), indexed=1.0),
# . . .
# =====================================
# Split into training and testing data
# =====================================
(trainingData, testData) = td.randomSplit([0.7, 0.3])
trainingData.count() # 386
testData.count() # 155
testData.collect()
# =====================================
# Create the model
# =====================================
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rmClassifer = RandomForestClassifier(labelCol="indexed", \
featuresCol="pcaFeatures")
rmModel = rmClassifer.fit(trainingData)
# Predict on the test data
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","pcaFeatures").collect()
# [Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-12186.027, 38.1689, -0.9686])),
# Row(prediction=1.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-10924.0305, 43.984, 0.1742])),
# Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-9009.0251, 36.2106, 0.4038])),
evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", \
labelCol = "indexed", \
metricName = "accuracy")
# ERROR: metricName given invalid value precision.
evaluator.evaluate(predictions)
# 0.6451612903225806 - small dataset, bad correlation between features and target
# Draw a confusion matrix
labelList = predictions.select("indexed","label").distinct().toPandas()
predictions.groupBy("indexed","prediction").count().show()
# +-------+----------+-----+
# |indexed|prediction|count|
# +-------+----------+-----+
# | 1.0| 1.0| 22|
# | 0.0| 1.0| 15|
# | 1.0| 0.0| 40|
# | 0.0| 0.0| 78|
# +-------+----------+-----+
# Model accuracy - if 50% yes and 50% no data - even proportion
# =========================================================
# Another example
# =========================================================
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
# ERROR with old Vectors:
# IllegalArgumentException: 'requirement failed: Column features must be of type
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
# +-----------------------------------------------------------+
# |pcaFeatures |
# +-----------------------------------------------------------+
# |[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
# |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
# |[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
# +-----------------------------------------------------------+
spark.stop()